package com.asap.demo.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

/**
 * @author wangbh
 * @Description: test
 * @date 2021/7/9 15:16
 */
public class FlinkSQLExample {
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.11","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:15:21.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.12","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:20:23.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.13","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:21:24.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.14","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:22:24.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.16","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:23:24.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.15","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:26:25.001Z","DEVICE_PARENT_TYPE":"WAF","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.11","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:15:21.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.12","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:20:23.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.13","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:21:24.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.14","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:22:24.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.16","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:23:24.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.15","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:26:25.001Z","DEVICE_PARENT_TYPE":"TDA","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.11","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:15:21.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.12","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:20:23.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.13","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:21:24.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.14","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:22:24.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.16","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:23:24.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}
//    {"ORG_ID":"1","EVENT_NAME":"爬虫","EVENT_THREE_TYPE":"20399","SRC_PORT":"123","DST_PORT":"124","DST_IP":"10.16.254.15","SRC_IP":"50.115.134.50","CREATE_TIME":"2021-07-09T18:26:25.001Z","DEVICE_PARENT_TYPE":"IPS","SNOW_ID":"85512","EVENT_THREE_TYPE_DESC":"端口扫描","ts":"2021-05-27T16:06:58Z","ACCOUNT":"asap","collectionName":"bwdOMS","eRuleId":"0","RULE_TJ_COUNT":11,"TAGS" :{ "EVENT_ONE_TYPE" : "20000","DIRECTION" : "内部","EVENT_TWO_TYPE" : "20100","EVENT_THREE_TYPE" : "20101"},"DEVICE_TYPE":"OSM","DIRECTION":"0"}

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//		blinkStreamEnv.setParallelism(1);
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(blinkStreamEnv, blinkStreamSettings);

        String ddlSource = "CREATE TABLE asap_superset (\n" +
                "    EVENT_NAME STRING,\n" +
                "    SRC_PORT STRING,\n" +
                "    DST_PORT STRING,\n" +
                "    DST_IP STRING,\n" +
                "    SRC_IP STRING,\n" +
                "    CREATE_TIME TIMESTAMP(3),\n" +
                "    DEVICE_PARENT_TYPE STRING,\n" +
                "    SNOW_ID BIGINT, \n " +
                "    EVENT_THREE_TYPE_DESC STRING," +
                "    ts TIMESTAMP(3),\n" +
                "    pct AS PROCTIME(), \n" +
                "    WATERMARK FOR CREATE_TIME AS CREATE_TIME - INTERVAL '10' SECOND \n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
               "    'connector.version' = 'universal',\n" +
                "    'connector.topic' = 'flink_pressure_test',\n" +
                "    'connector.startup-mode' = 'latest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = '192.168.1.25:1813',\n" +
                "    'connector.properties.bootstrap.servers' = '192.168.1.25:9093',\n" +
                "    'connector.properties.group.id' = 'distinct',\n" +
                "    'format.type' = 'json'\n" +
                ")";


        blinkStreamTableEnv.executeSql(ddlSource);
        String querySQL = "select DEVICE_PARENT_TYPE,DST_IP,SRC_IP from asap_superset where DEVICE_PARENT_TYPE='WAF' AND EVENT_NAME like '%爬虫%' group by TUMBLE(CREATE_TIME, INTERVAL '5' MINUTE),DEVICE_PARENT_TYPE,DST_IP,SRC_IP";
        Table table = blinkStreamTableEnv.sqlQuery(querySQL);
        blinkStreamTableEnv.toRetractStream(table, Row.class).print("query==");

        String querySQL1 = "select DEVICE_PARENT_TYPE,DST_IP,SRC_IP from asap_superset where DEVICE_PARENT_TYPE='TDA' AND EVENT_NAME like '%爬虫%' group by TUMBLE(CREATE_TIME, INTERVAL '5' MINUTE),DEVICE_PARENT_TYPE,DST_IP,SRC_IP";
        Table table1 = blinkStreamTableEnv.sqlQuery(querySQL1);
        blinkStreamTableEnv.toRetractStream(table1, Row.class).print("query1==");

        String querySQL2 = "select DEVICE_PARENT_TYPE,DST_IP,SRC_IP from asap_superset where DEVICE_PARENT_TYPE='IPS' AND EVENT_NAME like '%爬虫%' group by TUMBLE(CREATE_TIME, INTERVAL '5' MINUTE),DEVICE_PARENT_TYPE,DST_IP,SRC_IP";
        Table table2 = blinkStreamTableEnv.sqlQuery(querySQL2);
        blinkStreamTableEnv.toRetractStream(table2, Row.class).print("query2==");


        blinkStreamTableEnv.createTemporaryView("table1", table1);
        blinkStreamTableEnv.createTemporaryView("table2", table2);
        blinkStreamTableEnv.createTemporaryView("table3", table);

//        String querySQL5 = "select * from table3";
//        Table table5 = blinkStreamTableEnv.sqlQuery(querySQL5);
//        blinkStreamTableEnv.toRetractStream(table5, Row.class).print("query5==");
//
//        String querySQL6 = "select * from table1";
//        Table table6 = blinkStreamTableEnv.sqlQuery(querySQL6);
//        blinkStreamTableEnv.toRetractStream(table6, Row.class).print("query6==");
//
//        String querySQL7 = "select * from table2";
//        Table table7 = blinkStreamTableEnv.sqlQuery(querySQL7);
//        blinkStreamTableEnv.toRetractStream(table7, Row.class).print("query7==");

        String querySQL8 = "select * from table2 ,table1,table3 where table3.SRC_IP=table1.SRC_IP and table3.DST_IP=table2.DST_IP";
        Table table8 = blinkStreamTableEnv.sqlQuery(querySQL8);
        blinkStreamTableEnv.toRetractStream(table8, Row.class).print("query8==");


        blinkStreamEnv.execute("Blink Stream SQL distinct demo");

//		blinkStreamTableEnv.execute("Blink Stream SQL count/distinct demo");
    }
}
